Amazon AthenaでETLした結果をRedshiftにロードしてみる
先日の Black Beltオンラインセミナー「Amazon Athena」で、Athenaの想定ユースケースとして「開発者が、大規模でない生データに対して、低頻度でETL処理をする」という話を聞き、実際に Athena で ETL して Redshift にロードしてみることにしました。
サンプルデータ
サンプルデータは下記データをダウンロードし、対象シートを[Orders]のみをS3ファイルに保存して、Athenaで orders_jp テーブルとして定義したものを利用します。
Superstoreサンプルデータ(不具合修正版) |Tableau Community
シナリオ
日本国内の売上情報が含まれるデータファイルがS3上に存在する。データファイルから製品カテゴリ〜製品毎の売上と利益率のデータマートを作成して、BIツールから参照できるようにこのデータマートをRedshiftにロードしたい。
さあ、やってみましょう。
データマートの作成とロード
以下の手順で、AthenaでS3上のデータファイルをETLして、その結果をRedshiftにロードします。
- Athena でデータファイルを orders_jp テーブルとして定義する
- Athena の orders_jp テーブルに対して集計クエリを実行する
- S3に保存される集計クエリの実行結果をロード用ディレクトリにコピー
- Redshiftにデータマートをロードする
1. Athena でデータファイルを orders_jp テーブルとして定義する
最初はAthenaでS3上のデータファイルを以下の orders_jp というテーブルにマッピングします。
CREATE EXTERNAL TABLE IF NOT EXISTS orders_jp ( order_id string, order_date string, order_priority string, order_quantity int, sales double, discount double, ship_mode string, profit int, unit_price int, advertising_expenses double, shipping_cost int, customer_name string, prefecture string, city string, region string, shop_name string, customer_segment string, product_category string, product_sub_category string, product_id string, product_name string, product_description string, product_container string, product_base_margin double, supplier string, delivery_scheduled_date string, shipping_date string ) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' WITH SERDEPROPERTIES ( 'serialization.format' = '\t', 'field.delim' = '\t' ) LOCATION 's3://cm-datalake/orders_jp/';
クエリーが実行できるようになりました。S3に格納されたデータは以下のようなデータです。
2. Athena の orders_jp テーブルに対して集計クエリを実行する
シナリオに従い、「データファイルから製品カテゴリ〜製品毎の売上と利益率」のデータマートを作成する集計クエリーを実行します。
SELECT product_category, product_sub_category, product_name, cast(sum(sales) AS bigint) AS sales, cast(sum(profit) AS bigint) AS profit FROM orders_jp GROUP BY 1,2,3 ORDER BY 1,2,3 ;
以下のような結果が得られました。
3. S3に保存される集計クエリの実行結果をロード用ディレクトリにコピー
クエリーの集計結果は、Settingsの「Query result location」に指定したパスのUnsavedディレクトリの下に、更に年・月・日のディレクトリ作成してデータファイルを自動的に保存します。1回のクエリ毎にデータ(.csv)とメタデータ(.metadata)の2つのファイルが保存されます。
今回は必要となるデータ(.csv)のファイルのみをRedshiftのロードデータ用のディレクトリにコピーします。
$ aws s3 cp s3://aws-athena-query-results-xxxxxxxxxxxx-us-east-1/Unsaved/2017/03/07/4db339da-201b-45b9-b083-4612288047c0.csv s3://cm-datalake/athena_etl/order_summary.csv copy: s3://aws-athena-query-results-xxxxxxxxxxxx-us-east-1/Unsaved/2017/03/07/4db339da-201b-45b9-b083-4612288047c0.csv to s3://cm-datalake/athena_etl/order_summary.csv
4. Redshiftにデータマートをロードする
まず、Redshiftにデータマートをロードするためのテーブルを準備します。
cmdb=> CREATE TABLE orders_summary ( cmdb(> category VARCHAR(128), cmdb(> sub_category VARCHAR(128), cmdb(> product_name VARCHAR(128), cmdb(> sales BIGINT, cmdb(> profit BIGINT cmdb(> ) cmdb-> ; CREATE TABLE
データをロードする前に、ファイル形式を確認します。
"product_category","product_sub_category","product_name","sales","profit" "テクノロジー","コピー機とファックス","Brother DCP1000 Digital 3 in 1 Multifunction Machine","5380949","577265" "テクノロジー","コピー機とファックス","Canon Image Class D660 Copier","10549240","1002616" "テクノロジー","コピー機とファックス","Canon Imageclass D680 Copier / Fax","3863655","554255" "テクノロジー","コピー機とファックス","Canon PC-428 Personal Copier","5782544","988122"
データ(.csv)は先頭行にカラム名のヘッダファイルを持ちます。また、各フィールドの全ての値はダブルクオート「"」によって囲まれています。この点を考慮してCOPYコマンドのオプションを指定します。
最後にデータをロードします。先頭行をスキップするために「IGNOREHEADER AS 1」を指定します。また、カラムデータのダブルクオート「"」を除去するため「CSV QUOTE '"'」の指定も必要です。
cmdb=> COPY orders_summary cmdb-> FROM 's3://cm-datalake/athena_etl/order_summary.csv' cmdb-> CREDENTIALS 'aws_access_key_id=<aws_access_key_id>;aws_secret_access_key=<aws_secret_access_key>' cmdb-> CSV QUOTE '"' cmdb-> IGNOREHEADER AS 1 cmdb-> ; INFO: Load into table 'orders_summary' completed, 1282 record(s) loaded successfully. COPY
以下の通り、意図した形式で格納されたことが確認できます。
cmdb=> select * from orders_summary limit 5; category | sub_category | product_name | sales | profit --------------+------------------------+------------------------------------------------------+----------+---------- テクノロジー | コピー機とファックス | Canon PC-428 Personal Copier | 5782544 | 988122 テクノロジー | コピー機とファックス | Canon imageCLASS 2200 Advanced Copier | 10813685 | -3149376 テクノロジー | コピー機とファックス | Sharp AL-1530CS Digital Copier | 16255847 | 1177030 テクノロジー | コンピューター周辺機器 | Acco Keyboard-In-A-Box® | 306083 | -2916 テクノロジー | コンピューター周辺機器 | Belkin 107-key enhanced keyboard, USB/PS/2 interface | 363399 | -50937 (5 rows)
最後に
Amazon Athenaのクエリ結果はS3ファイルとして保存されるので、この機能を利用することで簡易なETLが可能であることが確認できました。Redshiftでは、Nested−JSONの構造化データへの変換 や カラムの中のカンマ区切りデータを複数レコードに分解することができないという課題がありましたが、Athenaと組み合わせることで解決できます。今回はクエリ結果のS3ファイルを手動でコピーしてRedshiftにロードしましたが、これらをプログラムで自動化することも可能です。機会があればブログで紹介したいと思います。
昨年のre:Invent2016 でフルマネージドのETLサービス AWS Glue が発表されましたが、本日時点でサービス開始されていません。ETLは将来的に AWS Glue が第一候補となると考えられますが、個人的にはワークロードに合わせ適材適所で GlueのSpark と AthenaのPresto、より大規模で全てに対応できるEMRと 使い分けられるとベストではないかと考えています。